共计 1983 个字符,预计需要花费 5 分钟才能阅读完成。
并发:多线程交替操作同一资源类
并行:多线程同时操作多个资源类
concurrent.futures
是 Python 3.2 引入的新模块,它是在 threading 和 multiprocessing 之上的一个通用抽象层,提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 两个类,以便使用线程池 / 进程池并发地执行任务。
map 与 submit 方法
map 方法很简单,map 的结果和入参顺序是对应的。
future 模式更强大,用 as_completed 顺序是不定的。
import httpx
from parsel import Selector
from concurrent.futures import ThreadPoolExecutor, as_completed
urls = [f"https://news.cnblogs.com/n/page/{page}" for page in range(1, 10 + 1)]
def craw(url):
r = httpx.get(url)
return r.text
def parse(html):
selector = Selector(html)
title = selector.css(".content h2.news_entry a::text")[0].get()
return title
with ThreadPoolExecutor() as pool:
htmls = pool.map(craw, urls)
htmls = list(zip(urls, htmls))
for url, html in htmls:
print(url, len(html))
print("craw over")
with ThreadPoolExecutor() as pool:
futures = {}
for url, html in htmls:
future = pool.submit(parse, html)
futures[future] = url
for future in as_completed(futures):
url = futures[future]
print(url, future.result())
print("parse over")
Web 服务多线程加速
import flask
import json
import time
from concurrent.futures import ThreadPoolExecutor
app = flask.Flask(__name__)
pool = ThreadPoolExecutor()
def read_file():
time.sleep(0.1)
return "file rusult"
def read_db():
time.sleep(0.2)
return "db rusult"
def read_api():
time.sleep(0.1)
return "api rusult"
@app.route("/")
def index():
result_file = pool.submit(read_file)
result_db = pool.submit(read_db)
result_api = pool.submit(read_api)
return json.dumps(
{"result_file": result_file.result(),
"result_db": result_db.result(),
"result_api": result_api.result(),}
)
if __name__ == "__main__":
app.run()
Web 服务多进程加速
from flask import Flask
import json
import math
from concurrent.futures import ProcessPoolExecutor
app = Flask(__name__)
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n, 2):
if n % i == 0:
return False
return True
@app.route("/is-prime/<numbers>")
def api_is_prime(numbers: str):
number_list = [int(x) for x in numbers.split(",")]
results = process_pool.map(is_prime, number_list)
return json.dumps(dict(zip(number_list, results)))
if __name__ == "__main__":
process_pool = ProcessPoolExecutor()
app.run()
正文完